This homework is about New York taxi trips. Here is something from Todd Schneider:
The New York City Taxi & Limousine Commission has released a detailed historical dataset covering over 1 billion individual taxi trips in the city from January 2009 through December 2019. Taken as a whole, the detailed trip-level data is more than just a vast list of taxi pickup and drop off coordinates: it's a story of a City. How bad is the rush hour traffic from Midtown to JFK? Where does the Bridge and Tunnel crowd hang out on Saturday nights? What time do investment bankers get to work? How has Uber changed the landscape for taxis? The dataset addresses all of these questions and many more.
The NY taxi trips dataset has been plowed by series of distinguished data scientists. The dataset is available from on Amazon S3 (Amazon's cloud storage service). The link for each file has the following form:
https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_{year}-{month}.csv
There is one CSV file for each NY taxi service (yellow, green, fhv) and each calendar month (replacing {year} and {month} by the desired ones).
Each file is moderately large, a few gigabytes.
The full dataset is relatively large if it has to be handled on a laptop (several hundred gigabytes).
You will focus on the yellow taxi service and a pair of months, from year 2015 and from year 2018.
Between those two years, for hire vehicles services have taken off and carved a huge marketshare.
Whatever the framework you use, CSV files prove hard to handle.
After downloading the appropriate files (this takes time, but this is routine), a first step will consist in converting the csv files into a more Spark friendly format such as parquet.
Saving into one of those formats require decisions about bucketing, partitioning and so on. Such decisions influence performance. It is your call. Many people have been working on this dataset, to cite but a few:
Depending on your internet connection, download the files corresponding to "yellow" taxis for the years 2015 and 2018. Download at least one month (the same) for 2015 and 2018, if you can download all of them.
Hint. The 12 csv for 2015 are about 23GB in total, but the corresponding parquet file, if you can create it for all 12 months, is only about 3GB.
You might need the following stuff in order to work with GPS coordinates and to plot things easily.
#!pip install geojson geopandas plotly geopy
#!pip install descartes contextily ipyleaflet
#This is only to output every operation line in each block
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "last"
# import the usual suspects
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import os
import json
import requests
from pathlib import Path
import sys
import timeit
import calendar
#%matplotlib inline
import seaborn as sns
sns.set_context("notebook", font_scale=1.2)
# Plotly
import plotly
import plotly.express as px
import plotly.graph_objs as go
from plotly.offline import download_plotlyjs, init_notebook_mode, plot, iplot
init_notebook_mode(connected=True)
import plotly.figure_factory as ff
from plotly.subplots import make_subplots
# Ipyleaflet
import ipyleaflet
from ipyleaflet import Map, basemaps, Heatmap, linear
from random import uniform
#Geopandas
import geopandas
import contextily as ctx
# Spark
import pyspark
import pyspark.sql.functions as fn
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import col
from pyspark.sql.catalog import Catalog
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType
conf = SparkConf().setAppName("NYC Taxis")
sc = SparkContext(conf=conf)
# Setting the amount of RAM memory for every executor in the Spark session to 12gb
# Normally we dont want to use all available memory in our computer but at least 1gb less than the max we have
# In my case I have 15,5 gb of RAM so using 12gb allows for 3.5gb for the rest of the processes being run
spark = (SparkSession
.builder
.config("spark.executor.memory", "4gb")
.config("spark.driver.memory", "4gb")
.appName("NYC_Analysis")
.getOrCreate())
# We put this, because the first time we create a session the memory parameters are not configured
# So we need to stop and re run the session again so that the values for executor.memory and driver.memory
# appear in the session
spark.stop()
For this homework we will let you decide on the tools to use (expected for Spark) and to find out information all by yourself (but don't hesitate to ask questions on the slack channel).
We want to organize the data on a per year and per service basis.
We want to end up with one parquet file for each year and each taxi service, since parquet is much better than CSV files.
Hint. Depending on your internet connection and your laptop, you can use only the "yellow" service and use one month of 2015 and 2018
CSV files can contain corrupted lines. You may have to work in order to perform ETL (Extract-Transform-Load) in order obtain a properly typed data frame.
You are invited to proceed as follows:
Hint. Don't forget to ask Spark to use all the memory and ressources from your computer.
Hint. Don't foreget that you should specify a partitioning column and a number of partitions when creating the parquet files.
Hint. Note that the schemas of the 2015 and 2018 data are different...
Hint. When working on this, ask you and answer to the following questions:
StorageLevel of the dataframe after reading the csv files?Since the storage was done using the disk after the reading of two csv files for 2015 we obtained a total amount of 3.4Gb for the storagelevel of the dataframes.
In our case we decided to partition by columns Month and Day, which yields 2 months and 30 or 31 days depending on the months, so we obtain 60 and 62 partitions for our parquet files once we have performed the ETL process to our csv files.
Yes it is and it is recommended so that the amount of data that we are working with is optimized even further within the parquet file. We get to choose which files from the whole parquet file we want to load depending on the analysis we would like to perform. In our case we wanted to do an anlysis on only one month of the data so we were able to chose only on part of our parquet files.
Because that allows us to select only the pieces of the parquet file that we are interested in analyzing and not load the whole file. Despite the parquet file being already a better choice to their csv counterparts, if we have the possibility of just loading a certain amount of the data involved this greatly increases the speed in our project. Modifying the number of partitions allows us to control how the parquet file tree is divided and thus we can get better loading and working times with our data, especially when working with huge amounts of information.
# Loading months of June and July for 2015
df_15_06 = spark.read\
.format('csv')\
.option("header", "true")\
.option("mode", "FAILFAST")\
.option("inferSchema", "true")\
.option("sep", ",")\
.load("yellow_tripdata_2015-06.csv")
df_15_07 = spark.read\
.format('csv')\
.option("header", "true")\
.option("mode", "FAILFAST")\
.option("inferSchema", "true")\
.option("sep", ",")\
.load("yellow_tripdata_2015-07.csv")
df_15_06.printSchema()
df_15_07.printSchema()
# Dataframe union for year 2015 months June and July
df_15_csv = df_15_06.union(df_15_07)
# Rounding the columns for coordinates to be able to make more efficient groupings and filtering of zones
# Adding columns Day Month and Year for partition of parquet files
df_15_csv = df_15_csv\
.withColumn("pickup_longitude", fn.round("pickup_longitude", 3))\
.withColumn("pickup_latitude", fn.round("pickup_latitude",3))\
.withColumn("dropoff_longitude", fn.round("dropoff_longitude",3))\
.withColumn("dropoff_latitude", fn.round("dropoff_latitude",3))\
.withColumn("Day", fn.dayofmonth("tpep_pickup_datetime"))\
.withColumn("Month", fn.month("tpep_pickup_datetime"))\
.withColumn("Year", fn.year("tpep_pickup_datetime"))
df_15_csv.printSchema()
# Loading months of June and July for 2018
df_18_06 = spark.read\
.format('csv')\
.option("header", "true")\
.option("mode", "FAILFAST")\
.option("inferSchema", "true")\
.option("sep", ",")\
.load("yellow_tripdata_2018-06.csv")
df_18_07 = spark.read\
.format('csv')\
.option("header", "true")\
.option("mode", "FAILFAST")\
.option("inferSchema", "true")\
.option("sep", ",")\
.load("yellow_tripdata_2018-07.csv")
df_18_06.printSchema()
df_18_07.printSchema()
# Dataframe union for year 2018 months June and July
df_18_csv = df_18_06.union(df_18_07)
# Adding columns Day Month and Year for partition of parquet files
df_18_csv = df_18_csv.withColumn("Day", fn.dayofmonth("tpep_pickup_datetime"))\
.withColumn("Month", fn.month("tpep_pickup_datetime"))\
.withColumn("Year", fn.year("tpep_pickup_datetime"))
# Filtering of columns due to corrupt data in year 2018 where there are more months than expected
df_18_csv = df_18_csv.where("(Month = 6 OR Month = 7) AND Year = 2018")
# Creation of parquet file for 2015 yellow taxi data, we partition by month and day
df_15_csv.repartition("Month", "Day").write.partitionBy("Month", "Day").parquet("parquets/yellow_tripdata_2015.parquet")
# Creation of parquet file for 2018 yellow taxi data, we partition by month and day
df_18_csv.repartition("Month", "Day").write.partitionBy("Month", "Day").parquet("parquets/yellow_tripdata_2018.parquet")
From now on, you will be using the parquet files you created for 2015.
We shall visualize several features of taxi traffic during one calendar month in 2015 and the same calendar month in 2018.
Hint. In order to build appealing graphics, you may stick to matplotlib + seaborn, you can use also
plotly, which is used a lot to build interactive graphics, but you can use whatever you want.
df_15_06_pds = pd.read_parquet("parquets/yellow_tripdata_2015.parquet/Month=6")
df_15_06_spark = spark.read.parquet("parquets/yellow_tripdata_2015.parquet/Month=6")
#df_2015_June.select("VendorID", "passenger_count", "tpep_pickup_datetime").show()
The following longitudes and lattitudes encompass Newark and JFK airports, Northern Manhattan and Verazzano bridge.
long_min = -74.10
long_max = -73.70
lat_min = 40.58
lat_max = 40.90
passenger_count and make a plot of that.query1 = f"""
(pickup_longitude BETWEEN {long_min} AND {long_max} AND pickup_latitude BETWEEN {lat_min} AND {lat_max})
AND (dropoff_longitude BETWEEN {long_min} AND {long_max} AND dropoff_latitude BETWEEN {lat_min} AND {lat_max})"""
df_coordinates = df_15_06_spark.where(query1)
filtered_count = df_coordinates.groupBy("passenger_count").count().toPandas()
fig = px.bar(filtered_count, x='passenger_count', y='count', hover_data=['count', 'passenger_count'],
color='count',
labels={'passenger_count':'# of passengers', 'count':'# of trips'}, height=600,
color_continuous_scale=px.colors.sequential.Viridis
)
fig.update_layout(
title="Number of trips for different passenger occupancies",
xaxis_title="Number of passengers",
yaxis_title="Number of trips")
fig.show()
Trips with $0$ or larger than $7$ passengers are pretty rare. We suspect these to be outliers. We need to explore these trips further in order order to understand what might be wrong with them
zero_passenger_df = df_15_06_spark.where("passenger_count = 0").toPandas()
plus6_passenger_df = df_15_06_spark.where("passenger_count > 6").toPandas()
When we analyze the statistics for the column trip_distance, we can see that the max distance for these trips is 37 miles, being the mean 2.36 miles which doesn't represent a real trip.
When we analyze the statistics for the column total_amount which is the total payment done for the trip, we can see that the max value is 500 dollars but the minimal value is a negative number which makes us question the real reason for these records to have been stored
We can also see that there are repeated records with inverse values for the total_amount to cancel out their value to the final count
zero_passenger_df.trip_distance.describe()
zero_passenger_df.total_amount.describe()
They should also not exist since a New York taxi can carry at most 5-6 people, which shows that these trips are either illegal or where done for another purpose.
When we analyze the statistics for the column trip_distance, we can see that the max distance for these trips is 40 miles, being the mean 5.36 miles which may be a real trip but we can see the percentiles are all in 0 miles.
When we analyze the statistics for the column total_amount which is the total payment done for the trip, we can see that the max value is 133 dollars but the minimal value is a negative number which makes us question the real reason for these records to have been stored.
We can also see that there are repeated records with inverse values for the total_amount to cancel out their value to the final count
plus6_passenger_df.trip_distance.describe()
plus6_passenger_df.total_amount.describe()
The largest distance travelled this month is 10,083,318 miles whilst the distance from Earth to the Moon is 238,900 miles. This means that someone travelled more than 21 times there and back to the moon in one trip!
max_distance = df_coordinates.agg({"trip_distance": "max"}).collect()[0][0]
print("Largest distance travelled this month: {:,} miles".format(max_distance))
print("Distance from Earth to the Moon: 238,900 miles")
trip_distance (using an histogram for instance) during year 2015. Focus on trips with non-zero trip distance and trip distance less than 30 miles.distance_travelling = df_15_06_pds[(df_15_06_pds['trip_distance'] > 0) & (df_15_06_pds['trip_distance'] < 30)]
#distance_travelling
fig = plt.figure(figsize=(10,5))
sns.set()
g = sns.distplot(distance_travelling["trip_distance"], color="purple")
g.set_title("Trip distance distribution for distances between 0 and 30 miles")
plt.xlabel("Distance in miles")
plt.ylabel("Proportion")
plt.show(g)
Let's look at what Spark does for these computations
explain method or have a look at the Spark UI to analyze the job. You should be able to assess df_15_06_spark.where("passenger_count = 0").explain(True)
In the optimized logical plan, Spark does optimization itself. It sees that there is no need for two filters. Instead, the same task can be done with only one filter using the and operator, so it does execution in one filter.
Depending on the RDBMS system we would probably see that it joins any information based on the schema of the tables and would further filter the data optimizing with algebraic queries
The physical plan first defines a "Project" in order to perform the queries. We would not see the Partition and Pushed filters for the information selected in an RDBMS
HashAggregate and Exchange hashpartitioning?Normally it takes between 1 and 2 stages to perform each job, each one different amounts of tasks to perform, all in the values of 220 tasks. In the case of the filter it took 3 and 5 stages to perform the jobs.
HashAggregateExec is a unary physical operator (i.e. with one child physical operator) for hash-based aggregation that is created (indirectly through AggUtils.createAggregate)
shuffle operations? If yes how many?Yes, in most operations a Shuffle read and write is performed by the physical plan of Spark. Depending on the task the memory used for the suffle operation varies but remains within the values of ~300Kb
When an action is produced upon the RDD, a job is created. Jobs are the main function that has to be done and is submitted to Spark. The jobs are divided into stages depending on how they can be separately carried out (mainly on shuffle boundaries). Then, these stages are divided into tasks. Tasks are the smallest unit of work that has to be done the executor.
In our case, our task depend on the job to perform, but remain within the values of 220, 1500, 450 tasks per stage
Now, compute the following and produce relevant plots:
distance_travelling["Week_day"] = distance_travelling['tpep_pickup_datetime'].dt.day_name()#
fig, axes = plt.subplots(nrows=7 , ncols=1, figsize=(15, 40))
for i, day in enumerate(calendar.day_name[0:]):
sns.distplot(distance_travelling[distance_travelling["Week_day"] == day]["trip_distance"], ax=axes[i], color="purple")\
.set(xlabel='Trip Distance', ylabel='Proportion', title=f"{day} trip distribution")
fig.subplots_adjust(top=0.9, hspace = .6)
plt.show()
df_coordinates.select("pickup_longitude", "pickup_latitude").distinct().count()
ny_bor = geopandas.read_file(geopandas.datasets.get_path('nybb'))
ny = geopandas.read_file("BoroughBoundaries.geojson")
bronx = ny["geometry"].iloc[0].bounds
staten = ny["geometry"].iloc[1].bounds
brooklyn = ny["geometry"].iloc[2].bounds
queens = ny["geometry"].iloc[3].bounds
manhattan = ny["geometry"].iloc[4].bounds
#Bronx
bronx_long_min = bronx[0]
bronx_long_max = bronx[2]
bronx_lat_min = bronx[1]
bronx_lat_max = bronx[3]
#Staten
staten_long_min = staten[0]
staten_long_max = staten[2]
staten_lat_min = staten[1]
staten_lat_max = staten[3]
#Brooklyn
brooklyn_long_min = brooklyn[0]
brooklyn_long_max = brooklyn[2]
brooklyn_lat_min = brooklyn[1]
brooklyn_lat_max = brooklyn[3]
#Queens
queens_long_min = queens[0]
queens_long_max = queens[2]
queens_lat_min = queens[1]
queens_lat_max = queens[3]
#Manhattan
manhattan_long_min = manhattan[0]
manhattan_long_max = manhattan[2]
manhattan_lat_min = manhattan[1]
manhattan_lat_max = manhattan[3]
querybronx = f"(pickup_longitude BETWEEN {bronx_long_min} AND {bronx_long_max} AND pickup_latitude BETWEEN {bronx_lat_min} AND {bronx_lat_max})"
queryqueens = f"(pickup_longitude BETWEEN {queens_long_min} AND {queens_long_max} AND pickup_latitude BETWEEN {queens_lat_min} AND {queens_lat_max})"
querymanhattan = f"(pickup_longitude BETWEEN {manhattan_long_min} AND {manhattan_long_max} AND pickup_latitude BETWEEN {manhattan_lat_min} AND {manhattan_lat_max})"
querystaten = f"(pickup_longitude BETWEEN {staten_long_min} AND {staten_long_max} AND pickup_latitude BETWEEN {staten_lat_min} AND {staten_lat_max})"
querybrooklyn = f"(pickup_longitude BETWEEN {brooklyn_long_min} AND {brooklyn_long_max} AND pickup_latitude BETWEEN {brooklyn_lat_min} AND {brooklyn_lat_max})"
distinct_profits = df_coordinates.groupBy("pickup_longitude", "pickup_latitude").agg({"total_amount":"sum", "tip_amount":"sum"})
df_bronx = distinct_profits.where(querybronx).withColumn("zone", fn.lit("Bronx"))
df_manhattan = distinct_profits.where(querymanhattan).withColumn("zone", fn.lit("Manhattan"))
df_queens = distinct_profits.where(queryqueens).withColumn("zone", fn.lit("Queens"))
df_brooklyn = distinct_profits.where(querybrooklyn).withColumn("zone", fn.lit("Brooklyn"))
df_staten = distinct_profits.where(querystaten).withColumn("zone", fn.lit("Staten Island"))
boroughs = df_bronx.union(df_manhattan)
boroughs = boroughs.union(df_queens)
boroughs = boroughs.union(df_brooklyn)
boroughs = boroughs.union(df_staten)
boroughs = boroughs.groupBy("zone").agg({"sum(total_amount)":"sum", "sum(tip_amount)":"sum"})
boroughs = boroughs.toPandas()
boroughs = boroughs.rename(columns={"sum(sum(tip_amount))": "tips", "sum(sum(total_amount))": "profits"})
fig = go.Figure()
fig.add_trace(go.Scatter(x=super_union["zone"], y=super_union["tips"],
mode='lines+markers',
name='Tips',
line_shape='spline'))
fig.add_trace(go.Scatter(x=super_union["zone"], y=super_union["profits"],
mode='lines+markers',
name='Profits',
line_shape='spline'))
fig.update_layout(
title="Tips and profits for each NY Borough",
xaxis_title="New York City Boroughs",
yaxis_title="Dollars")
fig.show()
Consider one month of trips data from yellow taxis for each year
df_15_06_spark = spark.read.parquet("parquets/yellow_tripdata_2015.parquet/Month=6")
df_18_06_spark = spark.read.parquet("parquets/yellow_tripdata_2018.parquet/Month=6")
Compute and plot the following time series indexed by day of the week and hour of day:
df_15_06_spark = df_15_06_spark.withColumn("week_day", fn.date_format("tpep_pickup_datetime", "EEEE")).withColumn("Hour", fn.hour("tpep_pickup_datetime"))
df_18_06_spark = df_18_06_spark.withColumn("week_day", fn.date_format("tpep_pickup_datetime", "EEEE")).withColumn("Hour", fn.hour("tpep_pickup_datetime"))
number_pickups_weekday_hour_15 = df_15_06_spark.groupby("week_day", "Hour").count()
number_pickups_weekday_hour_18 = df_18_06_spark.groupby("week_day", "Hour").count()
pickups_df_15 = number_pickups_weekday_hour_15.toPandas()
pickups_df_18 = number_pickups_weekday_hour_18.toPandas()
# This is so the days of the week are shown in the correct order in the plot
pickups_df_15["week_day"] = pd.Categorical(pickups_df_15.week_day, categories=calendar.day_name[0:], ordered=True)
pickups_df_15 = pickups_df_15.sort_values(["week_day","Hour"])
# This is so the days of the week are shown in the correct order in the plot
pickups_df_18["week_day"] = pd.Categorical(pickups_df_18.week_day, categories=calendar.day_name[0:], ordered=True)
pickups_df_18 = pickups_df_18.sort_values(["week_day","Hour"])
fig = px.line(pickups_df_15, x='Hour', y='count', color = "week_day", line_shape='spline',
labels={'count':'# of pickups', 'week_day':'Day '})
fig.update_layout(
title="Number of pickups per hour for every day of the week in June 2015",
xaxis_title="Hour of the day",
yaxis_title="Number of pickups",
legend_title_text='Day of the week'
)
fig.show()
fig = px.line(pickups_df_18, x='Hour', y='count', color = "week_day", line_shape='spline',
labels={'count':'# of pickups', 'week_day':'Day '})
fig.update_layout(
title="Number of pickups per hour for every day of the week in June 2018",
xaxis_title="Hour of the day",
yaxis_title="Number of pickups",
legend_title_text='Day of the week'
)
fig.show()
avg_fare_spark_15 = df_15_06_spark.groupby("week_day", "Hour").agg({"total_amount": "avg"})
avg_fare_spark_18 = df_18_06_spark.groupby("week_day", "Hour").agg({"total_amount": "avg"})
avg_fare_df_15 = avg_fare_spark_15.toPandas()
avg_fare_df_18 = avg_fare_spark_18.toPandas()
# This is so the days of the week are shown in the correct order in the plot
avg_fare_df_15["week_day"] = pd.Categorical(avg_fare_df_15.week_day, categories=calendar.day_name[0:], ordered=True)
avg_fare_df_15 = avg_fare_df_15.sort_values(["week_day","Hour"])
# This is so the days of the week are shown in the correct order in the plot
avg_fare_df_18["week_day"] = pd.Categorical(avg_fare_df_18.week_day, categories=calendar.day_name[0:], ordered=True)
avg_fare_df_18 = avg_fare_df_18.sort_values(["week_day","Hour"])
fig = px.line(avg_fare_df_15, x='Hour', y='avg(total_amount)', color = "week_day", line_shape='spline',
labels={'avg(total_amount)':'avg fare', 'week_day':'Day '})
fig.update_layout(
title="Average fare per hour for every day of the week in June 2015",
xaxis_title="Hour of the day",
yaxis_title="Average fare in dollars",
legend_title_text='Day of the week'
)
fig.show()
fig = px.line(avg_fare_df_18, x='Hour', y='avg(total_amount)', color = "week_day",line_shape='spline',
labels={'avg(total_amount)':'avg fare', 'week_day':'Day '})
fig.update_layout(
title="Average fare per hour for every day of the week in June 2018",
xaxis_title="Hour of the day",
yaxis_title="Average fare in dollars",
legend_title_text='Day of the week'
)
fig.show()
duration_spark_15 = df_15_06_spark.withColumn("duration", (fn.col("tpep_dropoff_datetime").cast("long") - fn.col("tpep_pickup_datetime").cast("long"))/60)
duration_spark_18 = df_18_06_spark.withColumn("duration", (fn.col("tpep_dropoff_datetime").cast("long") - fn.col("tpep_pickup_datetime").cast("long"))/60)
avg_trip_spark_15 = duration_spark_15.groupby("week_day", "Hour").agg({"duration": "avg"})
avg_trip_spark_18 = duration_spark_18.groupby("week_day", "Hour").agg({"duration": "avg"})
avg_trip_df_15 = avg_trip_spark_15.toPandas()
avg_trip_df_18 = avg_trip_spark_18.toPandas()
# This is so the days of the week are shown in the correct order in the plot
avg_trip_df_15["week_day"] = pd.Categorical(avg_trip_df_15.week_day, categories=calendar.day_name[0:], ordered=True)
avg_trip_df_15 = avg_trip_df_15.sort_values(["week_day","Hour"])
# This is so the days of the week are shown in the correct order in the plot
avg_trip_df_18["week_day"] = pd.Categorical(avg_trip_df_18.week_day, categories=calendar.day_name[0:], ordered=True)
avg_trip_df_18 = avg_trip_df_18.sort_values(["week_day","Hour"])
fig = px.line(avg_trip_df_15, x='Hour', y='avg(duration)', color = "week_day", line_shape='spline',
labels={'avg(duration)':'avg trip duration ', 'week_day':'Day '})
fig.update_layout(
title="Average trip duration per hour for every day of the week in June 2015",
xaxis_title="Hour of the day",
yaxis_title="Average trip duration in minutes",
legend_title_text='Day of the week'
)
fig.show()
fig = px.line(avg_trip_df_18, x='Hour', y='avg(duration)', color = "week_day", line_shape='spline',
labels={'avg(duration)':'avg trip duration ', 'week_day':'Day '})
fig.update_layout(
title="Average trip duration per hour for every day of the week in June 2018",
xaxis_title="Hour of the day",
yaxis_title="Average trip duration in minutes",
legend_title_text='Day of the week'
)
fig.show()
avg_ongoing_trips_15 = df_15_06_spark.groupBy("Day", "Hour").count()
avg_ongoing_trips_18 = df_18_06_spark.groupBy("Day", "Hour").count()
avg_ongoing_trips_15 = avg_ongoing_trips_15.groupBy("Hour").agg({"count":"avg"})
avg_ongoing_trips_18 = avg_ongoing_trips_18.groupBy("Hour").agg({"count":"avg"})
avg_ongoing_trips_15 = avg_ongoing_trips_15.toPandas()
avg_ongoing_trips_18 = avg_ongoing_trips_18.toPandas()
avg_ongoing_trips_15 = avg_ongoing_trips_15.sort_values(by='Hour', ascending=True)
avg_ongoing_trips_18 = avg_ongoing_trips_18.sort_values(by='Hour', ascending=True)
fig = go.Figure()
fig.add_trace(go.Scatter(x=avg_ongoing_trips_15["Hour"], y=avg_ongoing_trips_15["avg(count)"],
mode='lines+markers',
name='2015',
text="trips",
hoverinfo='y+text',
line_shape='spline'))
fig.add_trace(go.Scatter(x=avg_ongoing_trips_18["Hour"], y=avg_ongoing_trips_18["avg(count)"],
mode='lines+markers',
name='2018',
text="trips",
hoverinfo='y+text',
line_shape='spline'))
fig.update_layout(
title="Average number of ongoing trips per hour in June",
legend_title_text='Year',
xaxis_title="Hour of the day",
yaxis_title="Number of trips")
fig.show()
In order to find the longitude and lattitude of JFK and Newark airport as well as the longitude and magnitudes of Manhattan, you can use a service like geojson.io. Plot the following time series, indexed the day of the week and hour of the day
#Exact JFK coordinates
JFK_long = -73.78606796264648
JFK_lat = 40.64274482191706
#JFK coordinates
jfk_long_min = -73.83
jfk_long_max = -73.74
jfk_lat_min = 40.62
jfk_lat_max = 40.67
jfk_id = 132
# Midtown coordinates
midtown_long_min = -74.027
midtown_long_max = -73.95
midtown_lat_min = 40.725
midtown_lat_max = 40.77
midtown_ids = (224, 164, 107, 90, 246, 68, 48, 163, 162, 229, 50, 230)
# Function defined to calculate the median of a list of values
def median(values_list):
med = np.median(values_list)
return float(med)
udf_median = fn.udf(median, FloatType())
queryMid_JFK_15 = f"(pickup_longitude BETWEEN {midtown_long_min} AND {midtown_long_max} AND pickup_latitude BETWEEN {midtown_lat_min} AND {midtown_lat_max}) AND (dropoff_longitude BETWEEN {jfk_long_min} AND {jfk_long_max} AND dropoff_latitude BETWEEN {jfk_lat_min} AND {jfk_lat_max})"
midtown_jfk_df_15 = df_15_06_spark.where(queryMid_JFK_15)
queryMid_JFK_18 = f"PULocationID IN {midtown_ids} AND DOLocationID = {jfk_id}"
midtown_jfk_df_18 = df_18_06_spark.where(queryMid_JFK_18)
midtown_jfk_df_15 = midtown_jfk_df_15.withColumn("duration", (fn.col("tpep_dropoff_datetime").cast("long") - fn.col("tpep_pickup_datetime").cast("long"))/60)
midtown_jfk_df_18 = midtown_jfk_df_18.withColumn("duration", (fn.col("tpep_dropoff_datetime").cast("long") - fn.col("tpep_pickup_datetime").cast("long"))/60)
median_trip_MID_JFK_spark_15 = midtown_jfk_df_15.groupby("week_day", "Hour").agg(udf_median(fn.collect_list(col("duration"))).alias('median_duration'))
median_trip_MID_JFK_spark_18 = midtown_jfk_df_18.groupby("week_day", "Hour").agg(udf_median(fn.collect_list(col("duration"))).alias('median_duration'))
med_trip_MID_JFK_df_15 = median_trip_MID_JFK_spark_15.toPandas()
med_trip_MID_JFK_df_18 = median_trip_MID_JFK_spark_18.toPandas()
# This is so the days of the week are shown in the correct order in the plot
med_trip_MID_JFK_df_15["week_day"] = pd.Categorical(med_trip_MID_JFK_df_15.week_day, categories=calendar.day_name[0:], ordered=True)
med_trip_MID_JFK_df_15 = med_trip_MID_JFK_df_15.sort_values(["week_day","Hour"])
# This is so the days of the week are shown in the correct order in the plot
med_trip_MID_JFK_df_18["week_day"] = pd.Categorical(med_trip_MID_JFK_df_18.week_day, categories=calendar.day_name[0:], ordered=True)
med_trip_MID_JFK_df_18 = med_trip_MID_JFK_df_18.sort_values(["week_day","Hour"])
fig = px.line(med_trip_MID_JFK_df_15, x='Hour', y='median_duration', color = "week_day",line_shape='spline',
labels={'median_duration':'median trip duration', 'week_day':'Day '})
fig.update_layout(
title="Median trip duration from Midtown to JFK airport per day per hour | June 2015",
xaxis_title="Hour of the day",
yaxis_title="Median duration in minutes",
legend_title_text='Day of the week'
)
fig.show()
fig = px.line(med_trip_MID_JFK_df_18, x='Hour', y='median_duration', color = "week_day",line_shape='spline',
labels={'median_duration':'median trip duration', 'week_day':'Day '})
fig.update_layout(
title="Median trip duration from Midtown to JFK airport per day per hour | June 2018",
xaxis_title="Hour of the day",
yaxis_title="Median duration in minutes",
legend_title_text='Day of the week'
)
fig.show()
queryJFK_MID_15 = f"(pickup_longitude BETWEEN {jfk_long_min} AND {jfk_long_max} AND pickup_latitude BETWEEN {jfk_lat_min} AND {jfk_lat_max}) AND (dropoff_longitude BETWEEN {midtown_long_min} AND {midtown_long_max} AND dropoff_latitude BETWEEN {midtown_lat_min} AND {midtown_lat_max})"
jfk_midtown_df_15= df_15_06_spark.where(queryJFK_MID_15)
queryJFK_MID_18 = f"PULocationID = {jfk_id} AND DOLocationID IN {midtown_ids}"
jfk_midtown_df_18 = df_18_06_spark.where(queryJFK_MID_18)
jfk_midtown_df_15 = jfk_midtown_df_15.withColumn("duration", (fn.col("tpep_dropoff_datetime").cast("long") - fn.col("tpep_pickup_datetime").cast("long"))/60)
jfk_midtown_df_18 = jfk_midtown_df_18.withColumn("duration", (fn.col("tpep_dropoff_datetime").cast("long") - fn.col("tpep_pickup_datetime").cast("long"))/60)
median_trip_JFK_MID_spark_15 = jfk_midtown_df_15.groupby("week_day", "Hour").agg(udf_median(fn.collect_list(col("duration"))).alias('median_duration'))
median_trip_JFK_MID_spark_18 = jfk_midtown_df_18.groupby("week_day", "Hour").agg(udf_median(fn.collect_list(col("duration"))).alias('median_duration'))
median_trip_JFK_MID_df_15 = median_trip_JFK_MID_spark_15.toPandas()
median_trip_JFK_MID_df_18 = median_trip_JFK_MID_spark_18.toPandas()
median_trip_JFK_MID_df_15["week_day"] = pd.Categorical(median_trip_JFK_MID_df_15.week_day, categories=calendar.day_name[0:], ordered=True)
median_trip_JFK_MID_df_15 = median_trip_JFK_MID_df_15.sort_values(["week_day","Hour"])
median_trip_JFK_MID_df_18["week_day"] = pd.Categorical(median_trip_JFK_MID_df_18.week_day, categories=calendar.day_name[0:], ordered=True)
median_trip_JFK_MID_df_18 = median_trip_JFK_MID_df_18.sort_values(["week_day","Hour"])
fig = px.line(median_trip_JFK_MID_df_15, x='Hour', y='median_duration', color = "week_day",line_shape='spline',
labels={'median_duration':'median trip duration', 'week_day':'Day '})
fig.update_layout(
title="Median trip duration from JFK airport to Midtown per day per hour | June 2015",
xaxis_title="Hour of the day",
yaxis_title="Median duration in minutes",
legend_title_text='Day of the week'
)
fig.show()
fig = px.line(median_trip_JFK_MID_df_18, x='Hour', y='median_duration', color = "week_day", line_shape='spline',
labels={'median_duration':'median trip duration', 'week_day':'Day '})
fig.update_layout(
title="Median trip duration from JFK airport to Midtown per day per hour | June 2018",
xaxis_title="Hour of the day",
yaxis_title="Median duration in minutes",
legend_title_text='Day of the week'
)
fig.show()
For this, you will need to find tools to display maps and to build choropleth maps. We let you look and find relevant tools to do this.
A. number of pickups
total_pickups_15_06 = df_15_06_spark.groupBy("pickup_latitude", "pickup_longitude").count()
total_pickups_15_06_df = total_pickups_15_06.toPandas()
#total_pickups_2015_06_df
locationsA = total_pickups_15_06_df.values.tolist()
mapA = Map(center=(40.67, -73.94), zoom=8)
heatmap = Heatmap(locations=locationsA,radius=8, blur =15)
mapA.add_layer(heatmap);
mapA
B. number of dropoffs
total_dropoffs_15_06 = df_15_06_spark.groupBy("dropoff_latitude", "dropoff_longitude").count()
total_dropoffs_15_06.count()
total_dropoffs_15_06_df = total_dropoffs_15_06.toPandas()
#total_dropoffs_2015_06_df
locationsB = total_dropoffs_15_06_df.values.tolist()
mapB = Map(center=(40.67, -73.94), zoom=8)
heatmap = Heatmap(locations=locationsB,radius=8, blur =15)
mapB.add_layer(heatmap);
mapB
C. number of pickups with dropoff at some airport (JFK, LaGuardia, Newark)
#Newark coordinates
newark_long_min =-74.202
newark_long_max = -74.145
newark_lat_min = 40.66
newark_lat_max = 40.711
#LaGuardia coordinates
laguardia_long_min = -73.89
laguardia_long_max = -73.85
laguardia_lat_min = 40.764
laguardia_lat_max = 40.785
airport_dropoff_query = f"""
(dropoff_longitude BETWEEN {jfk_long_min} AND {jfk_long_max} AND dropoff_latitude BETWEEN {jfk_lat_min} AND {jfk_lat_max})
OR
(dropoff_longitude BETWEEN {newark_long_min} AND {newark_long_max} AND dropoff_latitude BETWEEN {newark_lat_min} AND {newark_lat_max})
OR
(dropoff_longitude BETWEEN {laguardia_long_min} AND {laguardia_long_max} AND dropoff_latitude BETWEEN {laguardia_lat_min} AND {laguardia_lat_max})
"""
airport_dropoffs_df = df_15_06_spark.where(airport_dropoff_query)
#airport_dropoffs_df.count()
pickups_to_airport = airport_dropoffs_df.groupBy("pickup_latitude", "pickup_longitude").count()
#pickups_to_airport.count()
pickups_to_airport_df = pickups_to_airport.toPandas()
#pickups_to_airport_df
locationsC = pickups_to_airport_df.values.tolist()
mapC = Map(center=(40.67, -73.94), zoom=8)
heatmap = Heatmap(locations=locationsC,radius=8, blur =15)
mapC.add_layer(heatmap);
mapC
pdf_18 = spark.read.parquet("parquets/yellow_tripdata_2018.parquet")
zones = geopandas.read_file("NYCTaxiZones.geojson")
zones.head()
zones_info = zones[["objectid", "zone"]]
zones_info["objectid"] = zones_info["objectid"].astype(int)
A. number of pickups in the area
pickups_18 = pdf_18.groupBy("PULocationID").count()
pickups_18_df = pickups_18.toPandas()
pickups_18_df = pd.merge(pickups_18_df, zones_info, how='left', left_on='PULocationID', right_on='objectid')
pickups_18_df = pickups_18_df[["PULocationID", "count", "zone"]]
pickups_18_df.head()
fig = px.choropleth(pickups_18_df, geojson=zones,
locations='PULocationID', color='count',
hover_name = 'zone',
labels={'PULocationID':'Taxi zone', 'count': 'Pickups'}
)
fig.update_geos(fitbounds="locations", visible=True)
fig.update_layout(title_text = 'Number of pickups per NYC Taxi area')
fig.show()
B. ratio of number of payments by card/number of cash payments for pickups in the area
payment_ratio_18 = pdf_18.groupBy("PULocationID","payment_type").count()
t1 = payment_ratio_18.where("payment_type = 1").select("PULocationID", fn.col("count").alias("t1"))
t2 = payment_ratio_18.where("payment_type = 2").select("PULocationID", fn.col("count").alias("t2"))
payment_types = t1.join(t2, "PULocationID")
payment_types = payment_types.withColumn("ratio", fn.round(fn.col("t2")/fn.col("t1"), 3))
#payment_types.show()
payment_ratio = payment_types.toPandas()
payment_ratio = pd.merge(payment_ratio, zones_info, how='left', left_on='PULocationID', right_on='objectid')
payment_ratio = payment_ratio[["PULocationID", "ratio", "zone"]]
payment_ratio.head()
fig = px.choropleth(payment_ratio, geojson=zones,
locations='PULocationID', color='ratio',
hover_name = 'zone',
labels={'PULocationID':'Taxi zone', 'ratio': 'Card/Cash ratio'}
)
fig.update_geos(fitbounds="locations", visible=True)
fig.update_layout(title_text = 'Card/Cash payment ratio per NYC Taxi area')
fig.show()
C. ratio of total fare/trip duration for dropoff in the area
pdf_18 = pdf_18.withColumn("duration", (fn.col("tpep_dropoff_datetime").cast("long") - fn.col("tpep_pickup_datetime").cast("long"))/60)
doffs_total_fare_18 = pdf_18.groupBy("DOLocationID",).agg({"total_amount":"sum"})
doffs_total_duration_18 = pdf_18.groupBy("DOLocationID",).agg({"duration":"sum"})
ratio_fare_duration = doffs_total_fare_18.join(doffs_total_duration_18, "DOLocationID")
ratio_fare_duration = ratio_fare_duration.withColumn("ratio", fn.col("sum(total_amount)")/fn.col("sum(duration)"))
ratio_fare_duration = ratio_fare_duration.toPandas()
ratio_fare_duration = pd.merge(ratio_fare_duration, zones_info, how='left', left_on='DOLocationID', right_on='objectid')
ratio_fare_duration = ratio_fare_duration[["DOLocationID", "ratio", "zone"]]
ratio_fare_duration.head()
fig = px.choropleth(ratio_fare_duration, geojson=zones,
locations='DOLocationID', color='ratio',
hover_name = 'zone',
labels={'DOLocationID':'Taxi zone', 'ratio': 'Fare/Duration ratio'}
)
fig.update_geos(fitbounds="locations", visible=True)
fig.update_layout(title_text = 'Total fare-Trip duration ratio per dropoff NYC Taxi area')
fig.show()
hour of day and where the color is a function of question A and question Bpdf_18 = pdf_18.withColumn("Hour", fn.hour("tpep_dropoff_datetime"))
with open('NYCTaxiZones.geojson') as json_file:
jsdata = json.load(json_file)
A. average number of dropoffs in the area during that hour of the day
avg_doffs_hour_18 = pdf_18.groupBy("DOLocationID","Hour").count()
#avg_doffs_hour_18.show()
slider3A_df = avg_doffs_hour_18.withColumn("average", fn.col("count")/61)
slider3A_df = slider3A_df.select("DOLocationID", "Hour", "average").toPandas()
slider3A_df = pd.merge(slider3A_df, zones_info, how='left', left_on='DOLocationID', right_on='objectid')
slider3A_df = slider3A_df[["DOLocationID", "Hour", "zone", "average"]]
slider3A_df.head()
# Create empty list for data objects:
data_sliderA = []
for hour in slider3A_df.Hour.unique():
# Filter the df by hour
df_hourly = slider3A_df[(slider3A_df['Hour'] == hour )]
for col in df_hourly.columns: # Transformation of columns into string for text labels
df_hourly[col] = df_hourly[col].astype(str)
df_hourly['text'] = 'Zone: ' + df_hourly['zone'] + '\nAverage dropoffs: ' + df_hourly['average']
# Create the dictionary with the data for the current hour
hourly_data = dict(
type='choropleth', # type of map-plot
colorscale = 'Viridis',
locations = df_hourly['DOLocationID'], # the column with the state
z = df_hourly['average'], # the variable I want to color-code
geojson = jsdata,
featureidkey = "properties.objectid",
text=df_hourly['text'],
colorbar = dict(title = "Average dropoffs")
)
data_sliderA.append(hourly_data) # I add the dictionary to the list of dictionaries for the slider
# Steps for the slider
stepsA = []
for i in range(len(data_sliderA)):
step = dict(method='restyle',
args=['visible', [False] * len(data_sliderA)],
label='Hour {}'.format(i)) # label to be displayed for each step (year)
step['args'][1][i] = True
stepsA.append(step)
# I create the 'sliders' object from the 'steps'
slidersA = [dict(active=0, pad={"t": 1}, steps=stepsA)]
# I set up the layout (including slider option)
layoutA = dict(geo=dict(fitbounds="locations", visible=True), sliders=slidersA,
title_text = "Average number of dropoffs per NYC Taxi area every hour")
# I create the figure object:
figA = dict(data=data_sliderA, layout=layoutA)
# to plot in the notebook
plotly.offline.plot(figA)
#This will output an HTML file that can be opened in the browser, because opening it in the notebook is too heavy
#plotly.offline.plot(fig, auto_open=True, image = 'png', image_filename="avg_number_dropoffs" ,image_width=2000, image_height=1000,
# filename='sliderA.html', validate=True)
B. average ratio of tip over total fare amount for pickups in the area at given hour of the day
ratio_tip_fare = pdf_18.groupBy("DOLocationID","Hour").agg({"total_amount":"sum", "tip_amount":"sum"})
slider3B_df = ratio_tip_fare.withColumn("ratio", fn.col("sum(tip_amount)")/fn.col("sum(total_amount)"))
slider3B_df = slider3B_df.select("DOLocationID", "Hour", "ratio").toPandas()
slider3B_df = pd.merge(slider3B_df, zones_info, how='left', left_on='DOLocationID', right_on='objectid')
slider3B_df = slider3B_df[["DOLocationID", "Hour", "zone", "ratio"]]
slider3B_df.head()
# Create empty list for data object:
data_sliderB = []
for hour in slider3B_df.Hour.unique():
# I select the hour
df_hourly = slider3B_df[(slider3B_df['Hour'] == hour )]
for col in df_hourly.columns: # Transformation of columns into string for text labels
df_hourly[col] = df_hourly[col].astype(str)
df_hourly['text'] = 'Zone: ' + df_hourly['zone'] + '\nRatio: ' + df_hourly['ratio']
# Create the dictionary with the data for the current hour
hourly_data = dict(
type='choropleth', # type of map-plot
colorscale = 'Viridis',
autocolorscale = False,
locations = df_hourly['DOLocationID'], # the column with the state
z = df_hourly['ratio'], # the variable I want to color-code
geojson = jsdata,
featureidkey = "properties.objectid",
text=df_hourly["text"],
colorbar = dict(title = "Ratio")
)
data_sliderB.append(hourly_data) # I add the dictionary to the list of dictionaries for the slider
## I create the steps for the slider
stepsB = []
for i in range(len(data_sliderB)):
step = dict(method='restyle',
args=['visible', [False] * len(data_sliderB)],
label='Hour {}'.format(i)) # label to be displayed for each step (year)
step['args'][1][i] = True
stepsB.append(step)
## I create the 'sliders' object from the 'steps'
slidersB = [dict(active=0, pad={"t": 1}, steps=stepsB)]
layoutB = dict(geo=dict(fitbounds="locations", visible=True), sliders=slidersB,
title_text = "Average ratio of tip/fare per dropoff NYC Taxi area every hour")
# I create the figure object:
figB = dict(data=data_sliderB, layout=layoutB)
# Plot in the notebook
plotly.offline.plot(figB)
#This will output an HTML file that can be opened in the browser, because opening it in the notebook is too heavy
#plotly.offline.plot(fig, auto_open=True, image = 'png', image_filename="ratio_tip_fare" ,image_width=2000, image_height=1000,
# filename='sliderB.html', validate=True)
spark.stop()